using Microsoft.Extensions.Logging;
using Orleans;
using Orleans.Providers;
using Orleans.Providers.Streams.Common;
using Orleans.Streams;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

namespace iTool.ClusterComponent
{
    internal class AdapterReceiver<TSerializer> : IQueueAdapterReceiver
        where TSerializer : class, IMemoryMessageBodySerializer
    {
        private readonly List<Task> awaitingTasks;
        private readonly ILogger logger;
        private readonly TSerializer serializer;
        private readonly IQueueAdapterReceiverMonitor receiverMonitor;
        private readonly IGrainFactory grainFactory;
        private Guid guid;
        private string queueId;

        public AdapterReceiver(IGrainFactory grainFactory, Guid guid, string queueId, ILogger logger, TSerializer serializer, IQueueAdapterReceiverMonitor receiverMonitor)
        {
            this.queueId = queueId;
            this.guid = guid;
            this.grainFactory = grainFactory;
            this.logger = logger;
            this.serializer = serializer;
            awaitingTasks = new List<Task>();
            this.receiverMonitor = receiverMonitor;
        }

        public Task Initialize(TimeSpan timeout)
        {
            this.receiverMonitor?.TrackInitialization(true, TimeSpan.MinValue, null);
            return Task.CompletedTask;
        }

        public async Task<IList<IBatchContainer>> GetQueueMessagesAsync(int maxCount)
        {
            var watch = Stopwatch.StartNew();
            List<IBatchContainer> batches = new List<IBatchContainer>();
            Task<IEnumerable<MessageData>> task = null;
            try
            {
                IStreamQueueGrain queueGrain = this.grainFactory.GetGrain<IStreamQueueGrain>(this.guid, this.queueId);

                task = queueGrain.Dequeue(maxCount);
                awaitingTasks.Add(task);
                var eventData = await task;
                batches = eventData.Select(data => new BatchContainer<TSerializer>(data, this.serializer)).ToList<IBatchContainer>();
                watch.Stop();
                this.receiverMonitor?.TrackRead(true, watch.Elapsed, null);
                int count = eventData.Count();
                if (count > 0)
                {
                    var oldestMessage = eventData.First();
                    var newestMessage = eventData.Last();
                    this.receiverMonitor?.TrackMessagesReceived(count, oldestMessage.EnqueueTimeUtc, newestMessage.EnqueueTimeUtc);
                }
            }
            catch (Exception exc)
            {
                watch.Stop();
                this.receiverMonitor?.TrackRead(true, watch.Elapsed, exc);
                Console.WriteLine("error:" + exc.Message);
            }
            finally
            {
                awaitingTasks.Remove(task);
            }



            return batches;
        }

        public Task MessagesDeliveredAsync(IList<IBatchContainer> messages)
        {
            return Task.CompletedTask;
        }

        public async Task Shutdown(TimeSpan timeout)
        {
            var watch = Stopwatch.StartNew();
            try
            {
                if (awaitingTasks.Count != 0)
                {
                    await Task.WhenAll(awaitingTasks);
                }
                watch.Stop();
                this.receiverMonitor?.TrackShutdown(true, watch.Elapsed, null);
            }
            catch (Exception ex)
            {
                watch.Stop();
                this.receiverMonitor?.TrackShutdown(false, watch.Elapsed, ex);
            }
        }
    }
}
